package net.dubboclub.http.netty;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.remoting.http.HttpHandler;
import com.alibaba.dubbo.remoting.http.support.AbstractHttpServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


/**
 * @date: 2016/3/7.
 * @author:bieber.
 * @project:dubbo-side.
 * @package:com.alibaba.dubbo.remoting.http.netty.
 * @version:1.0.0
 * @fix:
 * @description: netty的http服务端
 */
public class NettyHttpServer extends AbstractHttpServer {

    private ServerBootstrap serverBootstrap;

    private static final Logger logger = LoggerFactory.getLogger(NettyHttpServer.class);

    //channel holder的mapping
    private static ConcurrentHashMap<Channel, ChannelHolder> channelMapping = new ConcurrentHashMap<Channel, ChannelHolder>();


    private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-netty-http-server-idle-timer", true));

    private ScheduledFuture idleTimer;

    public NettyHttpServer(URL url, HttpHandler handler) {
        super(url, handler);
        long interval = url.getParameter( Constants.HEARTBEAT_KEY, Constants.DEFAULT_HEARTBEAT);
        long timeout = url.getParameter( Constants.HEARTBEAT_TIMEOUT_KEY, interval * 3 );
        if ( timeout < interval * 2 ) {
            throw new IllegalStateException( "heartbeatTimeout < heartbeatInterval * 2" );
        }
        startIdleCheckTimer(interval,timeout);
        startServer(url,handler);
    }

    private void startIdleCheckTimer(long interval,long timeout){
        stopCheckIdelTimer();
        idleTimer = scheduled.scheduleWithFixedDelay(new IdleTask(timeout, channelMapping),interval,interval, TimeUnit.MILLISECONDS);
    }

    private void stopCheckIdelTimer() {
        if (idleTimer != null && ! idleTimer.isCancelled()) {
            try {
                idleTimer.cancel(true);
                scheduled.purge();
            } catch ( Throwable e ) {
                if (logger.isWarnEnabled()) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        idleTimer =null;
    }

    protected static ConcurrentHashMap<Channel,ChannelHolder> getChannelMapping(){
        return channelMapping;
    }


    private void startServer(URL url,HttpHandler handler){
        final EventLoopGroup bossGroup = new NioEventLoopGroup(1,new NamedThreadFactory("NettyServerBoss",true));
        final EventLoopGroup workerGroup = new NioEventLoopGroup(url.getParameter(Constants.IO_THREADS_KEY,Constants.DEFAULT_IO_THREADS),new NamedThreadFactory("NettyServerWorker",true));
        serverBootstrap = new ServerBootstrap();
        final RestfulHttpHandler restfulHttpHandler = new RestfulHttpHandler(handler,url);
        serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast("decoder",new HttpRequestDecoder())
                        .addLast("decompressor",new HttpContentDecompressor())
                        .addLast("encoder", new HttpResponseEncoder())
                        .addLast("compressor",new HttpContentCompressor())
                        .addLast("handler",restfulHttpHandler);
            }
        }).option(ChannelOption.SO_KEEPALIVE,true);
        try {
            String host = url.getParameter(Constants.ANYHOST_KEY, false)
                    || NetUtils.isInvalidLocalHost(getUrl().getHost())
                    ? NetUtils.ANYHOST : getUrl().getHost();
            InetSocketAddress bindAddress = new InetSocketAddress(host, getUrl().getPort());
            ChannelFuture future = serverBootstrap.bind(bindAddress).sync();
            ChannelFuture closeFuture = future.channel().closeFuture();
            closeFuture.addListener(new GenericFutureListener() {

                @Override
                public void operationComplete(Future future) throws Exception {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                    stopCheckIdelTimer();
                }
            });
        } catch (InterruptedException e) {
            logger.error("Fail to start netty http server.",e);
            throw new IllegalStateException(e);
        }

    }
}
